package cn.doitedu.dashboard;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Job02_HotSearchKeywordTopn {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        // 建表，映射kafka的dwd层的行为明细宽表
        tenv.executeSql(
                " CREATE TABLE dwd_user_action_kafka (                                    "+
                        " 	event_id STRING,                                              "+
                        " 	action_time BIGINT,                                           "+
                        " 	properties MAP<STRING,STRING>,                                "+
                        " 	keyword as properties['keyword'],                             "+
                        " 	page_business string,                                         "+
                        " ) WITH (                                                        "+
                        "     'connector' = 'kafka',                                      "+
                        "     'topic' = 'dwd-user-action',                                "+
                        "     'properties.bootstrap.servers' = 'doitedu:9092',            "+
                        "     'properties.group.id' = 'g008',                             "+
                        "     'scan.startup.mode' = 'latest-offset',                      "+
                        "     'value.format' = 'json',                                    "+
                        "     'value.fields-include' = 'EXCEPT_KEY'                       "+
                        " )				                                                  "
        );


        // 过滤
        tenv.executeSql("create temporary view filtered_view as " +
                "select * from dwd_user_action_kafka where event_id = 'search' ");


        // 聚合
        // search,t1,咖啡,商城业务线
        // search,t1,咖啡,商城业务线
        // search,t1,牛奶,商城业务线
        // search,t1,牛奶,商城业务线
        // search,t1,红烧排骨,外卖业务线
        // search,t1,红烧排骨,外卖业务线
        // search,t1,红烧带鱼,外卖业务线
        // search,t1,红烧海带,外卖业务线
        tenv.executeSql(
                        " with tmp as (                                                                                               "+
                        " select                                                                                                      "+
                        "   window_start,                                                                                           "+
                        " 	window_end,                                                                                               "+
                        "   page_business,                                                                                          "+
                        " 	keyword,                                                                                                  "+
                        " 	count(1) as search_cnt                                                                                    "+
                        " from table(                                                                                                 "+
                        "     hop(table filtered_view,descriptor(rt),interval '1' second,interval '10' minute)                        "+
                        " )                                                                                                           "+
                        " group by                                                                                                    "+
                        "     window_start,                                                                                           "+
                        " 	  window_end,                                                                                             "+
                        "     page_business,                                                                                          "+
                        " 	  keyword                                                                                                 "+
                        " )                                                                                                           "+
                        "                                                                                                             "+
                        " ,tmp2 as (                                                                                                  "+
                        " select                                                                                                      "+
                        "     window_start,                                                                                           "+
                        " 	  window_end,                                                                                             "+
                        "     page_business,                                                                                          "+
                        " 	  keyword,                                                                                                "+
                        " 	  search_cnt,                                                                                             "+
                        " 	row_number() over(partition by window_start,window_end,page_business order by search_cnt desc ) as rn     "+
                        " from tmp )                                                                                                  "+
                        "                                                                                                             "+
                        "                                                                                                             "+
                        " select                                                                                                      "+
                        "    *                                                                                                        "+
                        " from tmp2                                                                                                   "+
                        " where rn<=10                                                                                                "

        );







    }
}
